Planning and Query Decomposition for Complex Retrieval

Plan-and-execute agents, sub-question generation, and multi-hop retrieval over heterogeneous data sources

Published

July 9, 2025

Keywords: planning, query decomposition, plan-and-execute, sub-question generation, multi-hop retrieval, multi-hop QA, plan-and-solve, self-ask, IRCoT, least-to-most, heterogeneous data sources, LangGraph, cognitive architecture, retrieval agent, compositionality gap, query routing, chain-of-thought retrieval

Introduction

Ask a retrieval agent “What is RAG?” and a single vector search + LLM call suffices. Ask it “How does the cost of hosting a fine-tuned Llama 3 model on vLLM compare to using GPT-4o through the API for a RAG pipeline processing 10,000 queries per day?” and the agent is stuck — no single retrieval can answer the question because the answer requires composing facts from multiple sources, in a specific order, with intermediate reasoning between retrievals.

This is the multi-hop retrieval problem. The user’s question isn’t atomic — it decomposes into sub-questions, each requiring its own retrieval from potentially different data sources, and the answer to one sub-question may determine what the next sub-question should be.

Research confirms the difficulty. Press et al. (2022) introduced the compositionality gap — measuring how often models can answer individual sub-questions correctly but fail to compose them into a correct multi-hop answer. They found that scaling model size improves single-hop recall faster than multi-hop composition, meaning bigger models don’t automatically solve the problem. What does solve it: explicit decomposition strategies like self-ask, where the model generates and answers follow-up questions before tackling the original.

The same insight applies to retrieval agents. Instead of throwing a complex query at a single retriever and hoping for the best, plan the retrieval: decompose the question into sub-questions, determine which data source each sub-question needs, retrieve answers sequentially or in parallel, and compose the final response from intermediate results.

This article implements three increasingly sophisticated planning strategies for retrieval agents:

  1. Plan-and-execute — generate a full plan upfront, execute each step with a retrieval agent, optionally re-plan based on intermediate results
  2. Sub-question decomposition — break complex queries into atomic sub-questions, retrieve and answer each independently, then synthesize
  3. Interleaved retrieval-reasoning (IRCoT) — alternate between chain-of-thought reasoning and retrieval, where each reasoning step determines the next retrieval

We build each pattern in LangGraph with working code, compare them on different query types, and show how to route sub-questions across heterogeneous data sources (vector stores, SQL databases, APIs, knowledge graphs).

Why Single-Shot Retrieval Fails for Complex Queries

The Compositionality Gap

Consider a multi-hop question: “Who is the CEO of the company that acquired the maker of ChatGPT?”

A human decomposes this naturally:

  1. Who made ChatGPT? → OpenAI
  2. Which company acquired OpenAI? → (This is a trick — OpenAI hasn’t been acquired, but Microsoft invested heavily)
  3. Who is the CEO of that company? → Satya Nadella

A standard RAG pipeline embeds the full question, retrieves chunks that happen to match some words, and often gets a garbled answer because no single document contains the full reasoning chain.

graph TD
    subgraph SingleShot["Single-Shot RAG"]
        A["Complex Query"] --> B["Embed full query"]
        B --> C["Retrieve top-k chunks"]
        C --> D["Generate answer"]
        D --> E["❌ Partial or wrong answer"]
    end

    subgraph Decomposed["Decomposed Retrieval"]
        F["Complex Query"] --> G["Decompose into<br/>sub-questions"]
        G --> H["Sub-Q1: Retrieve + Answer"]
        G --> I["Sub-Q2: Retrieve + Answer<br/>(uses Sub-Q1 result)"]
        G --> J["Sub-Q3: Retrieve + Answer<br/>(uses Sub-Q2 result)"]
        J --> K["Compose final answer"]
        K --> L["✅ Grounded multi-hop answer"]
    end

    style SingleShot fill:#fef2f2,stroke:#ef4444
    style Decomposed fill:#f0fdf4,stroke:#22c55e

Types of Multi-Hop Complexity

Not all complex queries are the same. The planning strategy depends on the type of complexity:

Complexity Type Example Planning Strategy
Sequential composition “What year was the director of Inception born?” Chain of dependent sub-questions
Parallel composition “Compare salaries of data scientists in SF vs NYC” Independent sub-questions, parallel retrieval
Conditional branching “If the API has rate limits, calculate cost; otherwise, estimate latency” Plan with decision points
Heterogeneous sources “What do our docs say about X and what does the database show for Y?” Route sub-questions to different retrievers
Iterative refinement “Find the best framework — check benchmarks, community size, and docs quality” Retrieve, evaluate, retrieve more

A good planning agent handles all of these. Let’s build one.

Plan-and-Execute Agents

The Architecture

The Plan-and-Execute paradigm separates planning from execution. Instead of the ReAct loop (think → act → observe → think again), the agent first generates a complete plan, then executes each step:

graph TD
    A["User Query"] --> B["Planner LLM"]
    B --> C["Step 1: Search for X"]
    B --> D["Step 2: Query DB for Y"]
    B --> E["Step 3: Calculate Z"]
    B --> F["Step 4: Compose answer"]

    C --> G["Executor Agent"]
    G --> H["Result 1"]
    H --> D
    D --> I["Executor Agent"]
    I --> J["Result 2"]
    J --> E
    E --> K["Executor Agent"]
    K --> L["Result 3"]
    L --> F
    F --> M["Executor Agent"]
    M --> N["Final Answer"]

    style B fill:#9b59b6,color:#fff,stroke:#333
    style G fill:#e67e22,color:#fff,stroke:#333
    style I fill:#e67e22,color:#fff,stroke:#333
    style K fill:#e67e22,color:#fff,stroke:#333
    style M fill:#e67e22,color:#fff,stroke:#333

Benefits over pure ReAct:

  • The planner sees the full scope of the task before any execution happens
  • The executor focuses on one step at a time — simpler tool selection, less context pollution
  • The plan itself is inspectable and auditable before execution
  • Different LLMs can be used for planning (powerful, expensive) vs. execution (efficient, cheap)

Tradeoffs:

  • More LLM calls total (planning + execution per step)
  • The initial plan may be wrong — requires re-planning capability
  • Less adaptive than ReAct for tasks where the next step depends entirely on the previous result

Implementation in LangGraph

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
import json

planner_llm = ChatOpenAI(model="gpt-4o", temperature=0)
executor_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)


class PlanExecuteState(TypedDict):
    messages: Annotated[list, add_messages]
    plan: list[str]            # List of planned steps
    current_step: int          # Index of the step being executed
    step_results: list[dict]   # Results from executed steps
    final_answer: str


def plan_step(state: PlanExecuteState) -> dict:
    """Generate a plan to answer the complex query."""
    query = state["messages"][-1].content

    response = planner_llm.invoke([
        {"role": "system", "content": (
            "You are a research planning expert. Given a complex question, "
            "decompose it into a sequence of concrete retrieval and reasoning steps.\n\n"
            "Rules:\n"
            "- Each step should be a single, actionable task\n"
            "- Steps can reference results from previous steps as 'result of step N'\n"
            "- The final step should always be 'Compose the final answer from all results'\n"
            "- Output ONLY a JSON array of step strings\n"
            "- Use 3-7 steps maximum"
        )},
        {"role": "user", "content": f"Question: {query}"},
    ])

    try:
        plan = json.loads(response.content)
    except json.JSONDecodeError:
        # Fallback: treat the response as a single step
        plan = [response.content.strip(), "Compose the final answer from all results"]

    return {"plan": plan, "current_step": 0, "step_results": []}


def execute_step(state: PlanExecuteState) -> dict:
    """Execute the current step in the plan."""
    step_idx = state["current_step"]
    step = state["plan"][step_idx]
    previous_results = state.get("step_results", [])

    # Build context from previous step results
    context = ""
    if previous_results:
        context = "Previous results:\n"
        for i, result in enumerate(previous_results):
            context += f"Step {i + 1}: {result['step']}\nResult: {result['result']}\n\n"

    response = executor_llm.invoke([
        {"role": "system", "content": (
            "You are a research execution agent with access to retrieval tools. "
            "Execute the given step using the provided context from previous steps. "
            "Be factual and concise."
        )},
        {"role": "user", "content": f"{context}Current task: {step}"},
    ])

    new_results = list(previous_results)
    new_results.append({"step": step, "result": response.content})

    return {"step_results": new_results, "current_step": step_idx + 1}


def should_continue(state: PlanExecuteState) -> str:
    """Check if there are more steps to execute."""
    if state["current_step"] >= len(state["plan"]):
        return "compose"
    return "execute"


def compose_answer(state: PlanExecuteState) -> dict:
    """Compose the final answer from all step results."""
    query = state["messages"][-1].content
    results = state.get("step_results", [])

    results_text = "\n\n".join(
        f"**Step {i + 1}**: {r['step']}\n**Result**: {r['result']}"
        for i, r in enumerate(results)
    )

    response = planner_llm.invoke([
        {"role": "system", "content": (
            "Compose a comprehensive answer to the original question "
            "using the research results below. Cite which step each fact comes from."
        )},
        {"role": "user", "content": (
            f"Original question: {query}\n\n"
            f"Research results:\n{results_text}"
        )},
    ])

    return {
        "messages": [{"role": "assistant", "content": response.content}],
        "final_answer": response.content,
    }


# Build the graph
graph = StateGraph(PlanExecuteState)
graph.add_node("plan", plan_step)
graph.add_node("execute", execute_step)
graph.add_node("compose", compose_answer)

graph.add_edge(START, "plan")
graph.add_edge("plan", "execute")
graph.add_conditional_edges("execute", should_continue, {
    "execute": "execute",
    "compose": "compose",
})
graph.add_edge("compose", END)

checkpointer = MemorySaver()
plan_execute_agent = graph.compile(checkpointer=checkpointer)

Adding Re-Planning

The initial plan is a best guess. After each execution step, the agent may discover that the plan needs adjustment — a step returned unexpected results, a data source was unavailable, or new information changes the remaining steps.

class PlanExecuteWithReplanState(TypedDict):
    messages: Annotated[list, add_messages]
    plan: list[str]
    current_step: int
    step_results: list[dict]
    replan_count: int
    final_answer: str


def maybe_replan(state: PlanExecuteWithReplanState) -> dict:
    """Evaluate whether the plan needs adjustment after the latest step."""
    if state["current_step"] >= len(state["plan"]):
        return {}  # Plan complete, no replanning needed

    results = state.get("step_results", [])
    remaining_steps = state["plan"][state["current_step"]:]
    latest = results[-1] if results else None

    response = planner_llm.invoke([
        {"role": "system", "content": (
            "You are evaluating whether a research plan needs adjustment.\n"
            "Given the latest result and remaining steps, decide:\n"
            "1. CONTINUE — the plan is still valid\n"
            "2. REPLAN — output a revised list of remaining steps as JSON array\n\n"
            "Output format: {\"action\": \"CONTINUE\"} or "
            "{\"action\": \"REPLAN\", \"new_steps\": [...]}"
        )},
        {"role": "user", "content": (
            f"Latest result: {latest}\n\n"
            f"Remaining steps: {remaining_steps}"
        )},
    ])

    try:
        decision = json.loads(response.content)
    except json.JSONDecodeError:
        return {}

    if decision.get("action") == "REPLAN" and "new_steps" in decision:
        completed_steps = state["plan"][:state["current_step"]]
        new_plan = completed_steps + decision["new_steps"]
        return {"plan": new_plan, "replan_count": state.get("replan_count", 0) + 1}

    return {}


def should_continue_with_replan(state: PlanExecuteWithReplanState) -> str:
    if state["current_step"] >= len(state["plan"]):
        return "compose"
    if state.get("replan_count", 0) > 3:
        return "compose"  # Safety: don't replan forever
    return "replan"


# Build the graph with replanning
graph = StateGraph(PlanExecuteWithReplanState)
graph.add_node("plan", plan_step)
graph.add_node("execute", execute_step)
graph.add_node("replan", maybe_replan)
graph.add_node("compose", compose_answer)

graph.add_edge(START, "plan")
graph.add_edge("plan", "execute")
graph.add_conditional_edges("execute", should_continue_with_replan, {
    "replan": "replan",
    "compose": "compose",
})
graph.add_edge("replan", "execute")
graph.add_edge("compose", END)

graph TD
    A["User Query"] --> B["Plan"]
    B --> C["Execute Step"]
    C --> D{"More steps?"}
    D -->|Yes| E["Re-Plan?"]
    E -->|Plan OK| C
    E -->|Adjusted| F["Updated Plan"]
    F --> C
    D -->|No| G["Compose Answer"]
    G --> H["Final Response"]

    style B fill:#9b59b6,color:#fff,stroke:#333
    style C fill:#e67e22,color:#fff,stroke:#333
    style E fill:#f5a623,color:#fff,stroke:#333
    style G fill:#1abc9c,color:#fff,stroke:#333

Plan-and-Execute vs. ReAct

Aspect ReAct Plan-and-Execute
Planning horizon One step at a time Full plan upfront
Adaptability Highly adaptive — each step sees all history Needs explicit re-planning
Context growth Full history accumulates in prompt Only step results, not full trace
Debuggability Inspect each thought step Inspect the plan itself
LLM calls 1 per reasoning step 1 for plan + 1 per step + 1 for composition
Best for Simple tool routing, < 5 steps Complex multi-step research, 5-15 steps

Sub-Question Decomposition

The Pattern

Sub-question decomposition explicitly breaks a complex query into independent or dependent atomic questions, each answerable by a single retrieval:

graph TD
    A["Complex Query:<br/>'Compare RAG costs: self-hosted vs API'"] --> B["Decomposer LLM"]
    B --> C["SQ1: What are typical<br/>self-hosted RAG costs?"]
    B --> D["SQ2: What are typical<br/>API-based RAG costs?"]
    B --> E["SQ3: What are the<br/>hidden costs of each?"]

    C --> F["Retrieve & Answer"]
    D --> G["Retrieve & Answer"]
    E --> H["Retrieve & Answer"]

    F --> I["Synthesizer LLM"]
    G --> I
    H --> I
    I --> J["Comprehensive<br/>Comparison"]

    style B fill:#9b59b6,color:#fff,stroke:#333
    style I fill:#1abc9c,color:#fff,stroke:#333

This is related to the self-ask method from Press et al. (2022) and the least-to-most prompting approach from Zhou et al. (2022). Both demonstrate that explicit sub-question generation dramatically improves multi-hop QA — self-ask improves accuracy by letting the model “ask itself” follow-up questions before answering, and plugging in a search engine to answer those follow-ups improves accuracy further.

Implementation: Parallel Sub-Question Decomposition

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
import json

llm = ChatOpenAI(model="gpt-4o", temperature=0)
fast_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)


class SubQuestionState(TypedDict):
    messages: Annotated[list, add_messages]
    sub_questions: list[dict]   # [{question, depends_on, source_hint}]
    answers: dict               # {question_id: answer}
    final_answer: str


def decompose_query(state: SubQuestionState) -> dict:
    """Break the complex query into atomic sub-questions."""
    query = state["messages"][-1].content

    response = llm.invoke([
        {"role": "system", "content": (
            "Decompose this complex question into atomic sub-questions.\n\n"
            "For each sub-question, specify:\n"
            "- id: a unique identifier (sq1, sq2, ...)\n"
            "- question: the sub-question text\n"
            "- depends_on: list of sub-question IDs whose answers are needed "
            "  before this one can be answered (empty list if independent)\n"
            "- source_hint: suggested data source "
            "  (vector_store, sql_database, api, knowledge_graph, web_search)\n\n"
            "Output as JSON array. Order sub-questions so dependencies come first.\n"
            "Generate 2-6 sub-questions maximum."
        )},
        {"role": "user", "content": query},
    ])

    try:
        sub_questions = json.loads(response.content)
    except json.JSONDecodeError:
        sub_questions = [{"id": "sq1", "question": query, "depends_on": [], "source_hint": "vector_store"}]

    return {"sub_questions": sub_questions, "answers": {}}


def answer_sub_questions(state: SubQuestionState) -> dict:
    """Answer all sub-questions, respecting dependency order."""
    sub_questions = state["sub_questions"]
    answers = dict(state.get("answers", {}))

    for sq in sub_questions:
        sq_id = sq["id"]
        if sq_id in answers:
            continue

        # Check if dependencies are met
        deps = sq.get("depends_on", [])
        if not all(d in answers for d in deps):
            continue

        # Build context from dependency answers
        dep_context = ""
        if deps:
            dep_context = "Context from previous answers:\n"
            for dep_id in deps:
                dep_context += f"- {dep_id}: {answers[dep_id]}\n"

        # Route to the appropriate retrieval source
        source = sq.get("source_hint", "vector_store")
        source_instruction = _get_source_instruction(source)

        response = fast_llm.invoke([
            {"role": "system", "content": (
                f"Answer this sub-question using the specified data source.\n"
                f"Source: {source}\n{source_instruction}\n"
                "Be factual and concise. If you don't have enough information, "
                "say so explicitly."
            )},
            {"role": "user", "content": f"{dep_context}\nQuestion: {sq['question']}"},
        ])
        answers[sq_id] = response.content

    return {"answers": answers}


def _get_source_instruction(source: str) -> str:
    """Return source-specific retrieval instructions."""
    instructions = {
        "vector_store": "Search the document knowledge base for relevant passages.",
        "sql_database": "Query the structured database for exact data points.",
        "api": "Call the appropriate API endpoint for real-time data.",
        "knowledge_graph": "Traverse entity relationships in the knowledge graph.",
        "web_search": "Search the web for up-to-date information.",
    }
    return instructions.get(source, "Use any available source.")


def check_all_answered(state: SubQuestionState) -> str:
    """Check if all sub-questions have been answered."""
    answers = state.get("answers", {})
    sub_questions = state.get("sub_questions", [])
    if len(answers) >= len(sub_questions):
        return "synthesize"
    return "answer"


def synthesize_answer(state: SubQuestionState) -> dict:
    """Compose the final answer from all sub-question answers."""
    query = state["messages"][-1].content
    sub_questions = state["sub_questions"]
    answers = state.get("answers", {})

    qa_text = "\n\n".join(
        f"**{sq['id']}**: {sq['question']}\n"
        f"**Source**: {sq.get('source_hint', 'unknown')}\n"
        f"**Answer**: {answers.get(sq['id'], 'Not answered')}"
        for sq in sub_questions
    )

    response = llm.invoke([
        {"role": "system", "content": (
            "Synthesize a comprehensive answer from the sub-question results below. "
            "Integrate all facts into a coherent response. "
            "Note which source each piece of information came from."
        )},
        {"role": "user", "content": (
            f"Original question: {query}\n\n"
            f"Sub-question results:\n{qa_text}"
        )},
    ])

    return {
        "messages": [{"role": "assistant", "content": response.content}],
        "final_answer": response.content,
    }


# Build the graph
graph = StateGraph(SubQuestionState)
graph.add_node("decompose", decompose_query)
graph.add_node("answer", answer_sub_questions)
graph.add_node("synthesize", synthesize_answer)

graph.add_edge(START, "decompose")
graph.add_edge("decompose", "answer")
graph.add_conditional_edges("answer", check_all_answered, {
    "answer": "answer",
    "synthesize": "synthesize",
})
graph.add_edge("synthesize", END)

sub_question_agent = graph.compile()

Dependency-Aware Execution

The key insight is the depends_on field. Some sub-questions are independent (can be retrieved in parallel), while others depend on prior answers:

graph TD
    Q["What is the performance gap between<br/>fine-tuned Llama 3 and GPT-4o for<br/>RAG pipelines, and what drives the cost?"]

    Q --> SQ1["SQ1: What is Llama 3's typical<br/>RAG accuracy?<br/>depends_on: []"]
    Q --> SQ2["SQ2: What is GPT-4o's typical<br/>RAG accuracy?<br/>depends_on: []"]
    Q --> SQ3["SQ3: What does fine-tuning<br/>Llama 3 cost?<br/>depends_on: []"]
    Q --> SQ4["SQ4: What is the performance gap?<br/>depends_on: [sq1, sq2]"]
    Q --> SQ5["SQ5: What drives the cost difference?<br/>depends_on: [sq3, sq2]"]

    SQ1 --> SQ4
    SQ2 --> SQ4
    SQ2 --> SQ5
    SQ3 --> SQ5

    style SQ1 fill:#56cc9d,stroke:#333,color:#fff
    style SQ2 fill:#56cc9d,stroke:#333,color:#fff
    style SQ3 fill:#56cc9d,stroke:#333,color:#fff
    style SQ4 fill:#6cc3d5,stroke:#333,color:#fff
    style SQ5 fill:#6cc3d5,stroke:#333,color:#fff

SQ1, SQ2, and SQ3 have no dependencies — they can be answered in parallel. SQ4 and SQ5 depend on prior answers and must wait. The answer_sub_questions node iterates until all questions are resolved, handling this dependency ordering naturally.

Interleaved Retrieval-Reasoning (IRCoT)

Why Interleaving Matters

Both plan-and-execute and sub-question decomposition generate the full decomposition before any retrieval happens. This works well when the question structure is clear upfront, but fails when what to retrieve next depends on what was just retrieved.

IRCoT (Trivedi et al., 2023) addresses this by interleaving retrieval with chain-of-thought reasoning — each reasoning step generates a retrieval query, and each retrieval result informs the next reasoning step. On HotpotQA, 2WikiMultihopQA, MuSiQue, and IIRC, IRCoT improved retrieval recall by up to 21 points and downstream QA accuracy by up to 15 points over one-shot retrieval.

graph TD
    subgraph OneShot["One-Shot Retrieval"]
        A1["Query"] --> A2["Retrieve All"] --> A3["Reason"] --> A4["Answer"]
    end

    subgraph IRCoT["Interleaved Retrieval-Reasoning"]
        B1["Query"] --> B2["Reason Step 1"]
        B2 --> B3["Retrieve for Step 1"]
        B3 --> B4["Reason Step 2<br/>(uses Step 1 result)"]
        B4 --> B5["Retrieve for Step 2"]
        B5 --> B6["Reason Step 3"]
        B6 --> B7["Answer"]
    end

    style OneShot fill:#fef2f2,stroke:#ef4444
    style IRCoT fill:#f0fdf4,stroke:#22c55e

The crucial difference: in IRCoT, the retrieval query at step N is informed by the reasoning and retrieval results from steps 1 through N-1. This means the agent dynamically adjusts what it searches for based on what it has already learned.

Implementation: IRCoT Agent

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o", temperature=0)


class IRCoTState(TypedDict):
    messages: Annotated[list, add_messages]
    original_query: str
    reasoning_chain: list[dict]   # [{thought, query, retrieved, conclusion}]
    iteration: int
    final_answer: str


def reason_and_query(state: IRCoTState) -> dict:
    """Generate the next reasoning step and retrieval query."""
    query = state.get("original_query", state["messages"][-1].content)
    chain = state.get("reasoning_chain", [])
    iteration = state.get("iteration", 0)

    # Build chain-of-thought context
    cot_context = ""
    if chain:
        cot_context = "Reasoning so far:\n"
        for i, step in enumerate(chain, 1):
            cot_context += f"\nStep {i}:\n"
            cot_context += f"  Thought: {step['thought']}\n"
            cot_context += f"  Retrieved: {step['retrieved'][:200]}\n"
            cot_context += f"  Conclusion: {step['conclusion']}\n"

    response = llm.invoke([
        {"role": "system", "content": (
            "You are reasoning step-by-step to answer a complex question.\n"
            "Based on what you know so far, generate:\n"
            "1. THOUGHT: What do you still need to find out?\n"
            "2. QUERY: A specific search query to retrieve the needed information\n\n"
            "If you have enough information to answer, instead output:\n"
            "THOUGHT: I have enough information.\n"
            "ANSWER: <your final answer>\n\n"
            "Output format (strict):\n"
            "THOUGHT: <reasoning>\n"
            "QUERY: <search query>\n"
            "OR\n"
            "THOUGHT: <reasoning>\n"
            "ANSWER: <final answer>"
        )},
        {"role": "user", "content": (
            f"Original question: {query}\n\n{cot_context}"
        )},
    ])

    text = response.content.strip()

    # Parse the response
    if "ANSWER:" in text:
        answer_part = text.split("ANSWER:", 1)[1].strip()
        return {
            "final_answer": answer_part,
            "iteration": iteration + 1,
        }

    thought = ""
    search_query = ""
    if "THOUGHT:" in text:
        thought = text.split("THOUGHT:", 1)[1].split("QUERY:", 1)[0].strip()
    if "QUERY:" in text:
        search_query = text.split("QUERY:", 1)[1].strip()

    # Store the partial reasoning step (retrieval will fill in the rest)
    new_chain = list(chain)
    new_chain.append({
        "thought": thought,
        "query": search_query,
        "retrieved": "",      # Will be filled by retrieve step
        "conclusion": "",     # Will be filled by conclude step
    })

    return {
        "original_query": query,
        "reasoning_chain": new_chain,
        "iteration": iteration + 1,
    }


def retrieve_for_step(state: IRCoTState) -> dict:
    """Retrieve documents for the current reasoning step's query."""
    chain = state.get("reasoning_chain", [])
    if not chain:
        return {}

    current_step = chain[-1]
    search_query = current_step.get("query", "")

    if not search_query:
        return {}

    # Retrieve from vector store (replace with your actual retriever)
    docs = vectorstore.similarity_search(search_query, k=3)
    retrieved_text = "\n".join(d.page_content[:300] for d in docs)

    # Update the current step with retrieved content
    updated_chain = list(chain)
    updated_chain[-1] = {**current_step, "retrieved": retrieved_text}

    return {"reasoning_chain": updated_chain}


def conclude_step(state: IRCoTState) -> dict:
    """Draw a conclusion from the retrieved information for this step."""
    chain = state.get("reasoning_chain", [])
    if not chain:
        return {}

    current_step = chain[-1]

    response = llm.invoke([
        {"role": "system", "content": (
            "Based on the thought and retrieved information, "
            "write a brief factual conclusion. One or two sentences."
        )},
        {"role": "user", "content": (
            f"Thought: {current_step['thought']}\n"
            f"Retrieved: {current_step['retrieved']}"
        )},
    ])

    updated_chain = list(chain)
    updated_chain[-1] = {**current_step, "conclusion": response.content}

    return {"reasoning_chain": updated_chain}


def should_continue_ircot(state: IRCoTState) -> str:
    """Check if the agent has produced a final answer or hit limits."""
    if state.get("final_answer"):
        return "done"
    if state.get("iteration", 0) >= 6:
        return "force_answer"
    return "retrieve"


def force_answer(state: IRCoTState) -> dict:
    """Force a final answer from whatever reasoning has been gathered."""
    query = state.get("original_query", "")
    chain = state.get("reasoning_chain", [])

    chain_text = "\n".join(
        f"- {step['conclusion']}" for step in chain if step.get("conclusion")
    )

    response = llm.invoke([
        {"role": "system", "content": "Answer the question using only the facts below."},
        {"role": "user", "content": f"Question: {query}\n\nFacts:\n{chain_text}"},
    ])

    return {
        "messages": [{"role": "assistant", "content": response.content}],
        "final_answer": response.content,
    }


def format_answer(state: IRCoTState) -> dict:
    """Format the final answer as a message."""
    return {
        "messages": [{"role": "assistant", "content": state["final_answer"]}],
    }


# Build the IRCoT graph
graph = StateGraph(IRCoTState)
graph.add_node("reason", reason_and_query)
graph.add_node("retrieve", retrieve_for_step)
graph.add_node("conclude", conclude_step)
graph.add_node("force_answer", force_answer)
graph.add_node("format_answer", format_answer)

graph.add_edge(START, "reason")
graph.add_conditional_edges("reason", should_continue_ircot, {
    "retrieve": "retrieve",
    "done": "format_answer",
    "force_answer": "force_answer",
})
graph.add_edge("retrieve", "conclude")
graph.add_edge("conclude", "reason")
graph.add_edge("force_answer", END)
graph.add_edge("format_answer", END)

ircot_agent = graph.compile()

When to Use IRCoT

IRCoT excels when the sub-questions aren’t known upfront — when each retrieval step reveals what needs to be retrieved next. This is common in:

  • Exploratory research — “What factors contributed to X?” (you don’t know the factors until you start retrieving)
  • Chain reasoning — “Who mentored the person who invented Y?” (each hop depends on the previous)
  • Conditional retrieval — “If the system uses X architecture, check for A; otherwise check for B”

Multi-Hop Retrieval over Heterogeneous Sources

The Routing Problem

Real-world retrieval agents don’t query a single vector store. They need to route sub-questions to the right data source:

Data Source Best For Example Query
Vector store Semantic search over documents “What are best practices for chunking?”
SQL database Exact counts, aggregations, structured data “How many users signed up last month?”
REST API Real-time data, external services “What is the current price of X?”
Knowledge graph Entity relationships, traversals “Who reports to the VP of Engineering?”
Web search Up-to-date information not in local sources “What was announced at the latest conference?”

Source-Aware Query Router

from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
import json

llm = ChatOpenAI(model="gpt-4o", temperature=0)


class MultiSourceState(TypedDict):
    messages: Annotated[list, add_messages]
    sub_questions: list[dict]
    routed_questions: list[dict]  # [{question, source, answer}]
    final_answer: str


def decompose_and_route(state: MultiSourceState) -> dict:
    """Decompose the query and assign each sub-question to a data source."""
    query = state["messages"][-1].content

    response = llm.invoke([
        {"role": "system", "content": (
            "Decompose this question into sub-questions and route each to the "
            "best data source.\n\n"
            "Available sources:\n"
            "- vector_store: technical documentation, articles, guides\n"
            "- sql_database: user data, metrics, transaction records\n"
            "- api: real-time pricing, stock data, weather\n"
            "- knowledge_graph: entity relationships, org charts, taxonomies\n"
            "- web_search: recent news, announcements, current events\n\n"
            "Output JSON array: [{\"id\": \"sq1\", \"question\": \"...\", "
            "\"source\": \"...\", \"depends_on\": []}]\n"
            "Generate 2-5 sub-questions."
        )},
        {"role": "user", "content": query},
    ])

    try:
        sub_questions = json.loads(response.content)
    except json.JSONDecodeError:
        sub_questions = [{"id": "sq1", "question": query, "source": "vector_store", "depends_on": []}]

    return {"sub_questions": sub_questions, "routed_questions": []}


def execute_routed_queries(state: MultiSourceState) -> dict:
    """Execute each sub-question against its assigned data source."""
    sub_questions = state["sub_questions"]
    answered = {rq["id"]: rq for rq in state.get("routed_questions", [])}
    new_answered = list(state.get("routed_questions", []))

    for sq in sub_questions:
        if sq["id"] in answered:
            continue

        # Check dependencies
        deps = sq.get("depends_on", [])
        if not all(d in answered for d in deps):
            continue

        # Gather dependency context
        dep_context = ""
        for dep_id in deps:
            dep_context += f"{dep_id}: {answered[dep_id]['answer']}\n"

        # Route to the appropriate retrieval function
        source = sq["source"]
        answer = _retrieve_from_source(source, sq["question"], dep_context)

        result = {**sq, "answer": answer}
        new_answered.append(result)
        answered[sq["id"]] = result

    return {"routed_questions": new_answered}


def _retrieve_from_source(source: str, question: str, context: str) -> str:
    """Route retrieval to the appropriate data source."""
    if source == "vector_store":
        docs = vectorstore.similarity_search(question, k=3)
        doc_text = "\n".join(d.page_content[:300] for d in docs)
        return _answer_from_context(question, doc_text, context)

    elif source == "sql_database":
        # Generate and execute SQL
        sql = _generate_sql(question, context)
        result = db.execute(sql)
        return f"SQL result: {result}"

    elif source == "api":
        # Call the appropriate API
        return _call_api(question, context)

    elif source == "knowledge_graph":
        # Query the knowledge graph
        cypher = _generate_cypher(question, context)
        result = graph_db.query(cypher)
        return f"Graph result: {result}"

    elif source == "web_search":
        # Web search fallback
        results = web_search(question)
        return _answer_from_context(question, results, context)

    return "Source not available"


def _answer_from_context(question: str, retrieved: str, dep_context: str) -> str:
    """Answer a sub-question from retrieved context."""
    response = ChatOpenAI(model="gpt-4o-mini", temperature=0).invoke([
        {"role": "system", "content": "Answer the question using ONLY the provided context."},
        {"role": "user", "content": (
            f"Context from previous steps: {dep_context}\n\n"
            f"Retrieved documents:\n{retrieved}\n\n"
            f"Question: {question}"
        )},
    ])
    return response.content


def _generate_sql(question: str, context: str) -> str:
    """Generate a SQL query from a natural language question."""
    response = ChatOpenAI(model="gpt-4o-mini", temperature=0).invoke([
        {"role": "system", "content": (
            "Generate a SQL query to answer this question. "
            "Return ONLY the SQL, no explanation. "
            "Use standard PostgreSQL syntax."
        )},
        {"role": "user", "content": f"Context: {context}\nQuestion: {question}"},
    ])
    return response.content


def _generate_cypher(question: str, context: str) -> str:
    """Generate a Cypher query for Neo4j from a natural language question."""
    response = ChatOpenAI(model="gpt-4o-mini", temperature=0).invoke([
        {"role": "system", "content": (
            "Generate a Cypher query to answer this question. "
            "Return ONLY the Cypher query, no explanation."
        )},
        {"role": "user", "content": f"Context: {context}\nQuestion: {question}"},
    ])
    return response.content


def check_all_routed(state: MultiSourceState) -> str:
    answered = {rq["id"] for rq in state.get("routed_questions", [])}
    all_ids = {sq["id"] for sq in state.get("sub_questions", [])}
    if answered >= all_ids:
        return "synthesize"
    return "execute"


def synthesize_multi_source(state: MultiSourceState) -> dict:
    """Synthesize the final answer from multi-source results."""
    query = state["messages"][-1].content
    results = state.get("routed_questions", [])

    results_text = "\n\n".join(
        f"**{r['id']}** ({r['source']}): {r['question']}\n{r['answer']}"
        for r in results
    )

    response = llm.invoke([
        {"role": "system", "content": (
            "Synthesize a comprehensive answer from results gathered across "
            "multiple data sources. Note which source each fact comes from."
        )},
        {"role": "user", "content": f"Question: {query}\n\nResults:\n{results_text}"},
    ])

    return {
        "messages": [{"role": "assistant", "content": response.content}],
        "final_answer": response.content,
    }


# Build the multi-source graph
graph = StateGraph(MultiSourceState)
graph.add_node("decompose_and_route", decompose_and_route)
graph.add_node("execute", execute_routed_queries)
graph.add_node("synthesize", synthesize_multi_source)

graph.add_edge(START, "decompose_and_route")
graph.add_edge("decompose_and_route", "execute")
graph.add_conditional_edges("execute", check_all_routed, {
    "execute": "execute",
    "synthesize": "synthesize",
})
graph.add_edge("synthesize", END)

multi_source_agent = graph.compile()

Source Selection Matrix

How should the router decide which source to use? The decomposer LLM handles this via instructions, but here’s the mental model:

graph TD
    A["Sub-Question"] --> B{"Question Type?"}
    B -->|"Conceptual / How-to"| C["Vector Store<br/>(semantic search)"]
    B -->|"Quantitative / Exact"| D["SQL Database<br/>(structured query)"]
    B -->|"Real-time / External"| E["API / Web Search<br/>(live data)"]
    B -->|"Relationship / Graph"| F["Knowledge Graph<br/>(traversal)"]
    B -->|"Comparative"| G["Multiple Sources<br/>(fan-out)"]

    style B fill:#f5a623,color:#fff,stroke:#333
    style C fill:#56cc9d,stroke:#333,color:#fff
    style D fill:#6cc3d5,stroke:#333,color:#fff
    style E fill:#ff7851,stroke:#333,color:#fff
    style F fill:#9b59b6,stroke:#333,color:#fff
    style G fill:#e67e22,stroke:#333,color:#fff

Comparing Planning Strategies

Strategy Selection Guide

Strategy Pre-plans Adapts Mid-Execution Parallel Sub-Qs Multi-Source Best For
ReAct (baseline) No Yes (every step) No Via tool routing Simple 1-5 step tasks
Plan-and-Execute Full plan Via re-planning Sequential Manual routing Structured multi-step research
Sub-Question Decomposition Decomposition Limited Yes (independent sub-Qs) Per sub-question Parallel fact-gathering
IRCoT No (step-by-step) Yes (each step) No Per retrieval step Chain reasoning, exploration
Multi-Source Router Decomposition + routing Limited Yes (same-level sub-Qs) Native Heterogeneous data landscapes

Decision Tree

graph TD
    A["Complex query?"] -->|No| B["Use ReAct"]
    A -->|Yes| C{"Sub-questions<br/>known upfront?"}
    C -->|Yes| D{"Independent<br/>or dependent?"}
    C -->|No| E["Use IRCoT<br/>(interleaved)"]
    D -->|Mostly independent| F{"Multiple<br/>data sources?"}
    D -->|Sequential chain| G["Use Plan-and-Execute<br/>(with re-planning)"]
    F -->|Yes| H["Use Multi-Source<br/>Router"]
    F -->|No| I["Use Sub-Question<br/>Decomposition"]

    style B fill:#56cc9d,stroke:#333,color:#fff
    style E fill:#9b59b6,stroke:#333,color:#fff
    style G fill:#e67e22,stroke:#333,color:#fff
    style H fill:#6cc3d5,stroke:#333,color:#fff
    style I fill:#ff7851,stroke:#333,color:#fff

Example Query Routing

Query Strategy Why
“What is RAG?” ReAct (single retrieval) Atomic question, one source
“Compare chunking strategies: fixed vs. semantic vs. agentic” Sub-Question Decomposition Three independent sub-questions
“Who is the CTO of the company that developed the model beating GPT-4 on MMLU?” IRCoT Each hop depends on the previous
“Summarize our Q3 revenue from the database and compare with industry trends from reports” Multi-Source Router SQL for revenue + vector store for reports
“Research the top 5 vector databases, benchmark their performance, and recommend one for our use case” Plan-and-Execute Multi-step research with defined phases

Domain-Specific Cognitive Architectures

Why General Planning Isn’t Enough

Harrison Chase’s “Planning for Agents” argues that nearly all production agents use domain-specific cognitive architectures — not general-purpose planning. General approaches like plan-and-solve show improvement on benchmarks, but production systems need architectures tailored to their specific task.

The insight: rather than asking the LLM to plan, encode the planning logic in code. The LLM handles reasoning and tool selection within each step, but the flow between steps is deterministic and purpose-built.

Example: Research Report Pipeline

A domain-specific architecture for generating research reports over multiple data sources:

class ResearchReportState(TypedDict):
    messages: Annotated[list, add_messages]
    topic: str
    # Phase 1: Scope
    scope_questions: list[str]
    # Phase 2: Gather
    source_results: dict          # {source_name: [results]}
    # Phase 3: Analyze
    key_findings: list[str]
    contradictions: list[str]
    gaps: list[str]
    # Phase 4: Synthesize
    report: str


def scope_topic(state: ResearchReportState) -> dict:
    """Phase 1: Define the research scope and generate targeted questions."""
    topic = state["messages"][-1].content

    response = llm.invoke([
        {"role": "system", "content": (
            "You are scoping a research report. Generate 3-5 specific, "
            "retrievable questions that together cover the topic comprehensively. "
            "Output as JSON array of strings."
        )},
        {"role": "user", "content": f"Topic: {topic}"},
    ])

    try:
        questions = json.loads(response.content)
    except json.JSONDecodeError:
        questions = [topic]

    return {"topic": topic, "scope_questions": questions}


def gather_from_docs(state: ResearchReportState) -> dict:
    """Phase 2a: Retrieve from document store."""
    results = dict(state.get("source_results", {}))
    doc_results = []

    for question in state.get("scope_questions", []):
        docs = vectorstore.similarity_search(question, k=3)
        for doc in docs:
            doc_results.append({
                "question": question,
                "content": doc.page_content,
                "source": doc.metadata.get("source", "docs"),
            })

    results["documents"] = doc_results
    return {"source_results": results}


def gather_from_database(state: ResearchReportState) -> dict:
    """Phase 2b: Query structured data."""
    results = dict(state.get("source_results", {}))
    db_results = []

    for question in state.get("scope_questions", []):
        response = fast_llm.invoke([
            {"role": "system", "content": (
                "If this question can be answered with a SQL query, generate the SQL. "
                "If not, respond with 'SKIP'. Return ONLY the SQL or 'SKIP'."
            )},
            {"role": "user", "content": question},
        ])
        if "SKIP" not in response.content.upper():
            # Execute SQL query (replace with actual DB)
            db_results.append({
                "question": question,
                "sql": response.content,
                "source": "database",
            })

    results["database"] = db_results
    return {"source_results": results}


def analyze_findings(state: ResearchReportState) -> dict:
    """Phase 3: Analyze all gathered results for insights and contradictions."""
    all_results = state.get("source_results", {})

    flat_results = []
    for source_name, items in all_results.items():
        for item in items:
            flat_results.append(f"[{source_name}] {item.get('content', item)}")

    results_text = "\n\n".join(flat_results[:20])  # Limit context

    response = llm.invoke([
        {"role": "system", "content": (
            "Analyze these research results. Identify:\n"
            "1. key_findings: Main facts and insights (JSON array)\n"
            "2. contradictions: Conflicting information (JSON array)\n"
            "3. gaps: What's missing or needs more research (JSON array)\n\n"
            "Output as JSON: {\"key_findings\": [...], \"contradictions\": [...], \"gaps\": [...]}"
        )},
        {"role": "user", "content": f"Topic: {state['topic']}\n\nResults:\n{results_text}"},
    ])

    try:
        analysis = json.loads(response.content)
    except json.JSONDecodeError:
        analysis = {"key_findings": [], "contradictions": [], "gaps": []}

    return {
        "key_findings": analysis.get("key_findings", []),
        "contradictions": analysis.get("contradictions", []),
        "gaps": analysis.get("gaps", []),
    }


def synthesize_report(state: ResearchReportState) -> dict:
    """Phase 4: Generate the final research report."""
    response = llm.invoke([
        {"role": "system", "content": (
            "Write a concise research report based on the analysis below. "
            "Structure: Overview, Key Findings, Potential Issues, Conclusions. "
            "Cite sources where possible."
        )},
        {"role": "user", "content": (
            f"Topic: {state['topic']}\n\n"
            f"Key Findings:\n" + "\n".join(f"- {f}" for f in state.get("key_findings", [])) + "\n\n"
            f"Contradictions:\n" + "\n".join(f"- {c}" for c in state.get("contradictions", [])) + "\n\n"
            f"Gaps:\n" + "\n".join(f"- {g}" for g in state.get("gaps", []))
        )},
    ])

    return {
        "messages": [{"role": "assistant", "content": response.content}],
        "report": response.content,
    }


# Build the domain-specific pipeline
graph = StateGraph(ResearchReportState)
graph.add_node("scope", scope_topic)
graph.add_node("gather_docs", gather_from_docs)
graph.add_node("gather_db", gather_from_database)
graph.add_node("analyze", analyze_findings)
graph.add_node("synthesize", synthesize_report)

graph.add_edge(START, "scope")
graph.add_edge("scope", "gather_docs")
graph.add_edge("gather_docs", "gather_db")
graph.add_edge("gather_db", "analyze")
graph.add_edge("analyze", "synthesize")
graph.add_edge("synthesize", END)

research_pipeline = graph.compile()

The flow is not planned by the LLM — it’s hardcoded as Scope → Gather → Analyze → Synthesize. The LLM handles reasoning within each step, but the transitions between steps are deterministic. This is what Harrison Chase calls “domain-specific cognitive architectures” — the most reliable pattern for production agents.

Common Pitfalls and How to Fix Them

Pitfall Symptom Fix
Over-decomposition Simple questions split into 5+ sub-questions Check query complexity first — route simple queries directly to retrieval
Plan never executes Planner generates vague or impossible steps Add step validation — each step must reference a concrete tool or source
Cascading errors Wrong answer in step 1 corrupts all later steps Add verification after each step; implement re-planning on low confidence
Source mismatch Sub-question routed to wrong data source Improve routing prompt with source descriptions and examples
Context explosion All intermediate results accumulate in prompt Summarize step results before passing to next step
Infinite re-planning Re-plan triggers after every step Cap re-plans (max 2-3); only re-plan when step results contradict the plan
Parallel fetch bottleneck Independent sub-questions answered sequentially Use async execution or LangGraph’s parallel node execution
No stopping condition IRCoT loops forever on open-ended queries Set max iterations (5-8); force answer when limit is reached

Conclusion

Complex retrieval questions require planning — no single-shot retrieval can compose facts from multiple sources, resolve multi-hop dependencies, or route sub-questions to heterogeneous data stores. The planning strategy you choose depends on the nature of the complexity.

Key takeaways:

  • Single-shot RAG fails for multi-hop questions. The compositionality gap means models that answer simple questions well still fail to compose multiple facts. Explicit decomposition is the fix.
  • Plan-and-execute separates planning from execution. The planner generates a full step-by-step plan; the executor handles one step at a time. Add re-planning for adaptability. Best for structured, multi-phase research tasks.
  • Sub-question decomposition breaks queries into atomic questions with dependency tracking. Independent sub-questions can run in parallel. Each can be routed to a different data source. Best for parallel fact-gathering across known dimensions.
  • IRCoT (interleaved retrieval-reasoning) alternates between chain-of-thought reasoning and retrieval. What to retrieve next is determined by what was just learned. Best for exploratory or chain-dependent questions where the decomposition isn’t known upfront.
  • Heterogeneous source routing assigns each sub-question to the right backend — vector store for concepts, SQL for metrics, APIs for real-time data, knowledge graphs for relationships. The decomposer doubles as a router.
  • Domain-specific cognitive architectures beat general-purpose planning for production systems. Encode the task flow in code (Scope → Gather → Analyze → Synthesize) and let the LLM handle reasoning within each step. This is how reliable agents are actually built.

Start with sub-question decomposition for most multi-hop retrieval tasks. Move to plan-and-execute when the task has clearly defined phases. Use IRCoT for exploratory chain reasoning. And when building for production, design a domain-specific pipeline where the flow is deterministic and the LLM handles step-level intelligence.

References

Read More